package com.writable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    private FlowBean outV = new FlowBean();

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {

        // 设置初始值的上下行总量
        long totalUp = 0l;
        long totalDown = 0l;

        // 在reduce之前，所有的同电话号码的FlowBean都聚合在一起了，遍历可以获取mapper处理好的数据
        for (FlowBean value : values) {
            totalUp += value.getUpFlow();
            totalDown += value.getDownFlow();
        }

        // 调用setter赋值
        outV.setUpFlow(totalUp);
        outV.setDownFlow(totalDown);
        outV.setSumFlow();

        // 写出
        context.write(key, outV);
    }
}
